-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Assign relative offsets in compressed message sets #1002
Conversation
I'm sorry to say that I can't quite tell what made Travis CI fail there. It looks like it could be related to Travis's recent image update (others seem to be reporting similar problems such as travis-ci/travis-ci#8898), but if you can point me at any issues with the patch itself I'll definitely address them. Thank you! |
Thanks, this looks great. Just one question, which I'd raised on #903 as well: do we need to do something to maintain backwards-compatibility? Is this supported by older brokers or do we need to guard this behaviour with a version check? Re. Travis, it has been really flaky recently; the issue you linked seems related. I'll see if I can get it working again, but it doesn't seem related to this PR specifically. |
Thank you!
Ah, right, thanks! This change is backwards compatible: Kafka brokers prior to version 0.10 always need to assign absolute offsets to the inner messages, and so they ignore any offsets supplied by the producer. However, it's simple to add a version check so that we don't need to worry about that at all; I'll update the PR. |
(note: PR updated to add Kafka version check!) |
This looks great to me thanks! Just rebase on master so CI runs properly and I'll merge it. |
As previously noted (IBM#903), Sarama currently doesn't assign inner offsets in compressed messages. This means that the broker has to assign relative offsets and then do recompression. Which is fine! But if we assign offsets in the producer, then the broker can skip recompression, improving broker CPU efficiency. One way to see the effect is by adding some debug logging to Kafka with the following patch: ``` diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 224a79277..4ecb98687 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -20,6 +20,7 @@ import java.nio.ByteBuffer import kafka.common.LongRef import kafka.message.{CompressionCodec, InvalidMessageException, NoCompressionCodec} +import org.apache.log4j.Logger import org.apache.kafka.common.errors.InvalidTimestampException import org.apache.kafka.common.record._ @@ -27,6 +28,7 @@ import scala.collection.mutable import scala.collection.JavaConverters._ private[kafka] object LogValidator { + private val log = Logger.getLogger(getClass()) /** * Update the offsets for this message set and do further validation on messages including: @@ -172,8 +174,13 @@ private[kafka] object LogValidator { // Validate the timestamp validateTimestamp(record, now, messageTimestampType, messageTimestampDiffMaxMs) // Check if we need to overwrite offset, no in place assignment situation 3 - if (logEntry.offset != expectedInnerOffset.getAndIncrement()) + val incrementedInnerOffset = expectedInnerOffset.getAndIncrement() + if (logEntry.offset != incrementedInnerOffset) { + log.info("overwriting inner offset; got " + logEntry.offset + " expected " + incrementedInnerOffset) inPlaceAssignment = false + } else { + log.info("no need to overwrite offset; got " + logEntry.offset + " expected " + incrementedInnerOffset) + } if (record.timestamp > maxTimestamp) maxTimestamp = record.timestamp } ``` This produces the following broker logs: Before: ``` [2017-12-13 13:30:04,667] INFO no need to overwrite offset; got 0 expected 0 (kafka.log.LogValidator$) [2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 1 (kafka.log.LogValidator$) [2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 2 (kafka.log.LogValidator$) [2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 3 (kafka.log.LogValidator$) [2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 4 (kafka.log.LogValidator$) [2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 5 (kafka.log.LogValidator$) [2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 6 (kafka.log.LogValidator$) [2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 7 (kafka.log.LogValidator$) [2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 8 (kafka.log.LogValidator$) [2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 9 (kafka.log.LogValidator$) ``` After: ``` [2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 0 expected 0 (kafka.log.LogValidator$) [2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 1 expected 1 (kafka.log.LogValidator$) [2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 2 expected 2 (kafka.log.LogValidator$) [2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 3 expected 3 (kafka.log.LogValidator$) [2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 4 expected 4 (kafka.log.LogValidator$) [2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 5 expected 5 (kafka.log.LogValidator$) [2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 6 expected 6 (kafka.log.LogValidator$) [2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 7 expected 7 (kafka.log.LogValidator$) [2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 8 expected 8 (kafka.log.LogValidator$) [2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 9 expected 9 (kafka.log.LogValidator$) ```
f4a007f
to
f0d0b0f
Compare
Excellent, thanks. Rebased and CI looks happy. |
It looks like this doesn't address recompression in 1.0.0. I'm not very familiar with the code, but maybe it's because this change doesn't affect 0.11.0.0+, which goes here: if req.Version >= 3 {
req.AddBatch(topic, partition, set.recordsToSend.recordBatch)
continue
} We're seeing high broker CPU usage with Snappy compression, broker stack looks like this:
|
I pin-pointed the issue to this lines: My patch for Kafka to get logs: diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 15750e9cd..5197d0885 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
import kafka.common.LongRef
import kafka.message.{CompressionCodec, NoCompressionCodec}
import kafka.utils.Logging
+import org.apache.log4j.Logger
import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Time
@@ -236,6 +237,7 @@ private[kafka] object LogValidator extends Logging {
// No in place assignment situation 1 and 2
var inPlaceAssignment = sourceCodec == targetCodec && toMagic > RecordBatch.MAGIC_VALUE_V0
+ logger.info("inPlaceAssignment = " + inPlaceAssignment + ", condition: sourceCodec (" + sourceCodec + ") == targetCodec (" + targetCodec + ") && toMagic (" + toMagic + ") > RecordBatch.MAGIC_VALUE_V0 (" + RecordBatch.MAGIC_VALUE_V0 + ")")
var maxTimestamp = RecordBatch.NO_TIMESTAMP
val expectedInnerOffset = new LongRef(0)
@@ -250,6 +252,7 @@ private[kafka] object LogValidator extends Logging {
// Do not compress control records unless they are written compressed
if (sourceCodec == NoCompressionCodec && batch.isControlBatch)
inPlaceAssignment = true
+ logger.info("inPlaceAssignment = " + inPlaceAssignment + ", condition: sourceCodec (" + sourceCodec + ") == NoCompressionCodec (" + NoCompressionCodec + ") && batch.isControlBatch (" + batch.isControlBatch + ")")
for (record <- batch.asScala) {
validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
@@ -261,21 +264,26 @@ private[kafka] object LogValidator extends Logging {
if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
// Check if we need to overwrite offset
// No in place assignment situation 3
- if (record.offset != expectedInnerOffset.getAndIncrement())
+ val off = expectedInnerOffset.getAndIncrement()
+ if (record.offset != off)
inPlaceAssignment = false
+ logger.info("inPlaceAssignment = " + inPlaceAssignment + ", condition: record.offset (" + record.offset + ") != expectedInnerOffset.getAndIncrement() (" + off + ")")
if (record.timestamp > maxTimestamp)
maxTimestamp = record.timestamp
}
// No in place assignment situation 4
- if (!record.hasMagic(toMagic))
+ if (!record.hasMagic(toMagic)) {
+ logger.info("inPlaceAssignment = " + inPlaceAssignment + ", condition: !record.hasMagic(toMagic) (" + !record.hasMagic(toMagic) + ")")
inPlaceAssignment = false
+ }
validatedRecords += record
}
}
if (!inPlaceAssignment) {
+ logger.info("inPlaceAssignment = " + inPlaceAssignment + "; recompressing")
val (producerId, producerEpoch, sequence, isTransactional) = {
// note that we only reassign offsets for requests coming straight from a producer. For records with magic V2,
// there should be exactly one RecordBatch per request, so the following is all we need to do. For Records Log output:
|
I think I got it: diff --git a/vendor/github.com/Shopify/sarama/produce_set.go b/vendor/github.com/Shopify/sarama/produce_set.go
index 61eb3f0..b9eac95 100644
--- a/vendor/github.com/Shopify/sarama/produce_set.go
+++ b/vendor/github.com/Shopify/sarama/produce_set.go
@@ -122,6 +122,10 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
for topic, partitionSet := range ps.msgs {
for partition, set := range partitionSet {
if req.Version >= 3 {
+ for i, record := range set.recordsToSend.recordBatch.Records {
+ record.OffsetDelta = int64(i)
+ }
+
req.AddBatch(topic, partition, set.recordsToSend.recordBatch)
continue
} @emfree, @eapache, does this seem reasonable to you? Should I make a PR? |
This mimicks IBM#1002, but for v3 requests. On Kafka 1.0.0 this prevents recompress on broker to add missing offsets.
As previously noted (#903), Sarama currently doesn't assign inner
offsets in compressed messages. This means that the broker has to assign
relative offsets and then do recompression.
Which is fine! But if we assign offsets in the producer, then the broker can
skip recompression, improving broker CPU efficiency.
One way to see the effect is by adding some debug logging to Kafka with
the following patch:
This produces the following broker logs:
Before:
After: